{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 34,
   "metadata": {
    "collapsed": false,
    "hideCode": false,
    "hidePrompt": false
   },
   "outputs": [],
   "source": [
    "import sys\n",
    "spark_home = '/Users/arj/Make/bin/spark-2.1.0-bin-hadoop2.7'\n",
    "sys.path.insert(0, spark_home + \"/python\")\n",
    "\n",
    "from pyspark import SparkConf, SparkContext\n",
    "\n",
    "urlMaster = 'spark://Arjuns-MacBook-Pro.local:7077'\n",
    "\n",
    "conf = (\n",
    "    SparkConf()\n",
    "        .setAppName('spark.app')\n",
    "        .setMaster(urlMaster)\n",
    ")\n",
    "sc = SparkContext(conf=conf)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 103,
   "metadata": {
    "collapsed": false,
    "hideCode": false,
    "hidePrompt": false
   },
   "outputs": [],
   "source": [
    "# building corpus\n",
    "from nltk.corpus import brown\n",
    "sentences = brown.sents()[:1000]\n",
    "corpus = sc.parallelize(sentences).map(lambda s: ' '.join(s))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true,
    "hideCode": false,
    "hidePrompt": false
   },
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": 108,
   "metadata": {
    "collapsed": false,
    "hideCode": false,
    "hidePrompt": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[[Chunk('The Fulton County Grand Jury/NP'), Chunk('said/VP'), Chunk('Friday an investigation/NP'), Chunk('of/PP'), Chunk('Atlanta/NP'), Chunk('recent primary election/NP'), Chunk('produced/VP'), Chunk('no evidence/NP'), Chunk('that any irregularities/NP'), Chunk('took/VP'), Chunk('place/NP')], [Chunk('The jury/NP'), Chunk('further said/VP'), Chunk('in/PP'), Chunk('term-end presentments/NP'), Chunk('that/PP'), Chunk('the City Executive Committee/NP'), Chunk('had/VP'), Chunk('over-all charge/NP'), Chunk('of/PP'), Chunk('the election/NP'), Chunk('deserves/VP'), Chunk('the praise/NP'), Chunk('thanks/NP'), Chunk('of/PP'), Chunk('the City/NP'), Chunk('of/PP'), Chunk('Atlanta/NP'), Chunk('for/PP'), Chunk('the manner/NP'), Chunk('in/PP'), Chunk('which the election/NP'), Chunk('was conducted/VP')]]\n"
     ]
    }
   ],
   "source": [
    "from itertools import chain\n",
    "from pattern.text.en import parsetree\n",
    "\n",
    "def get_chunks(sentence):\n",
    "    return list(chain.from_iterable(\n",
    "            map(\n",
    "                lambda sentence: sentence.chunks, \n",
    "                parsetree(sentence)\n",
    "            )\n",
    "        ))\n",
    "\n",
    "chunks = corpus \\\n",
    "    .map(get_chunks)\n",
    "    \n",
    "print (chunks.take(2))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "hideCode": false,
    "hidePrompt": false
   },
   "source": [
    "#### nouns word count"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 163,
   "metadata": {
    "collapsed": false,
    "scrolled": true
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[u'fulton', u'county', u'grand', u'jury', u'friday', u'investigation', u'atlanta', u'primary', u'election', u'evidence']\n"
     ]
    }
   ],
   "source": [
    "def match_noun_like_pos(pos):\n",
    "    import re\n",
    "    return re.match(re.compile('^N.*'), pos) != None\n",
    "\n",
    "noun_like = chunks \\\n",
    "    .flatMap(lambda chunks: chunks) \\\n",
    "    .filter(lambda chunk: chunk.part_of_speech == 'NP') \\\n",
    "    .flatMap(lambda chunk: chunk.words) \\\n",
    "    .filter(lambda word: match_noun_like_pos(word.part_of_speech)) \\\n",
    "    .map(lambda word: word.string.lower())\n",
    "\n",
    "print (noun_like.take(10))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 171,
   "metadata": {
    "collapsed": false,
    "scrolled": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[(u'state', 85), (u'city', 58), (u'administration', 52), (u'president', 52), (u'mr.', 52), (u'year', 46), (u'committee', 39), (u'bill', 39), (u'states', 37), (u'county', 35)]\n"
     ]
    }
   ],
   "source": [
    "noun_word_count = noun_like \\\n",
    "    .map(lambda word: (word, 1)) \\\n",
    "    .reduceByKey(lambda a, b: a + b) \\\n",
    "    .sortBy(lambda d: d[1], ascending=False)\n",
    "    \n",
    "print (noun_word_count.take(10))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "##### LDA"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 172,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "from pyspark.mllib.clustering import LDA, LDAModel\n",
    "from pyspark.mllib.linalg import Vectors"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 176,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[u'state', u'city', u'administration', u'president', u'mr.']"
      ]
     },
     "execution_count": 176,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "vocabulary = noun_word_count.map(lambda w: w[0]).collect()\n",
    "vocabulary[:5]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 205,
   "metadata": {
    "collapsed": false,
    "scrolled": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[[u'fulton',\n",
       "  u'county',\n",
       "  u'grand',\n",
       "  u'jury',\n",
       "  u'friday',\n",
       "  u'investigation',\n",
       "  u'atlanta',\n",
       "  u'primary',\n",
       "  u'election',\n",
       "  u'evidence',\n",
       "  u'irregularities',\n",
       "  u'place']]"
      ]
     },
     "execution_count": 205,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "doc_nouns = chunks \\\n",
    "    .map(lambda chunks: filter(\n",
    "            lambda chunk: chunk.part_of_speech == 'NP',\n",
    "            chunks\n",
    "        )) \\\n",
    "    .filter(lambda chunks: len(chunks) > 0) \\\n",
    "    .map(lambda chunks: list(chain.from_iterable(map(\n",
    "            lambda chunk: chunk.words,\n",
    "            chunks\n",
    "        )))) \\\n",
    "    .map(lambda words: filter(\n",
    "            lambda word: match_noun_like_pos(word.part_of_speech),\n",
    "            words\n",
    "        )) \\\n",
    "    .filter(lambda words: len(words) > 0) \\\n",
    "    .map(lambda words: map(\n",
    "            lambda word: word.string.lower(),\n",
    "            words,\n",
    "        ))\n",
    "    \n",
    "doc_nouns.take(1)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 207,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "def get_vector_representation(nouns, vocab):\n",
    "    return  Vectors.dense(map(\n",
    "        lambda word: 1.0 if word in nouns else 0.0,\n",
    "        vocab\n",
    "    ))\n",
    "\n",
    "doc_vecs = doc_nouns \\\n",
    "    .map(lambda nouns: get_vector_representation(set(nouns), vocabulary)) \\\n",
    "    .zipWithIndex().map(lambda x: [x[1], x[0]])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 209,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "ldaModel = LDA.train(doc_vecs, k=3)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 253,
   "metadata": {
    "collapsed": false,
    "scrolled": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Learned topics (as distributions over vocab of 2279 words):\n",
      "Topic 0:\n",
      "state: 27.6350480347\n",
      "city: 18.9516713343\n",
      "mr.: 17.5439356649\n",
      "president: 16.8568307883\n",
      "year: 15.4074257761\n",
      "committee: 14.0324129502\n",
      "administration: 13.9553862346\n",
      "bill: 12.960995307\n",
      "election: 11.6073234867\n",
      "house: 11.5578186886\n",
      "-----------\n",
      "Topic 1:\n",
      "state: 26.203168362\n",
      "administration: 18.269679186\n",
      "year: 16.2404114273\n",
      "president: 16.0424256301\n",
      "city: 14.5047677994\n",
      "bill: 13.728963992\n",
      "committee: 13.6235523038\n",
      "mr.: 13.6074814177\n",
      "tax: 12.0525070432\n",
      "states: 11.6004234735\n",
      "-----------\n",
      "Topic 2:\n",
      "state: 27.1617836034\n",
      "city: 17.5435608663\n",
      "president: 16.1007435816\n",
      "mr.: 15.8485829174\n",
      "administration: 15.7749345795\n",
      "states: 14.7379675751\n",
      "year: 13.3521627966\n",
      "house: 12.5135762168\n",
      "election: 12.140906292\n",
      "united: 11.8705878794\n",
      "-----------\n"
     ]
    }
   ],
   "source": [
    "print(\"Learned topics (as distributions over vocab of \" + str(ldaModel.vocabSize())\n",
    "      + \" words):\")\n",
    "topics = ldaModel.topicsMatrix()\n",
    "for topic in range(3):\n",
    "    print(\"Topic \" + str(topic) + \":\")\n",
    "    topic_words = sorted(map(\n",
    "        lambda d: (topics[d[0]][topic], d[1]),\n",
    "        enumerate(vocabulary)\n",
    "    ), reverse=True)\n",
    "    for word in topic_words[:10]:\n",
    "        print(\"{}: {}\".format(word[1], word[0]))\n",
    "    print '-----------'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "anaconda-cloud": {},
  "hide_code_all_hidden": false,
  "kernelspec": {
   "display_name": "Python 2",
   "language": "python",
   "name": "python2"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 2
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython2",
   "version": "2.7.10"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 1
}
